1   /*
2    * Copyright (C) 2007 The Guava Authors
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package com.google.common.eventbus;
18  
19  import static com.google.common.base.Preconditions.checkNotNull;
20  
21  import com.google.common.annotations.Beta;
22  
23  import java.util.concurrent.ConcurrentLinkedQueue;
24  import java.util.concurrent.Executor;
25  
26  /**
27   * An {@link EventBus} that takes the Executor of your choice and uses it to
28   * dispatch events, allowing dispatch to occur asynchronously.
29   *
30   * @author Cliff Biffle
31   * @since 10.0
32   */
33  @Beta
34  public class AsyncEventBus extends EventBus {
35    private final Executor executor;
36  
37    /** the queue of events is shared across all threads */
38    private final ConcurrentLinkedQueue<EventWithSubscriber> eventsToDispatch =
39        new ConcurrentLinkedQueue<EventWithSubscriber>();
40  
41    /**
42     * Creates a new AsyncEventBus that will use {@code executor} to dispatch
43     * events.  Assigns {@code identifier} as the bus's name for logging purposes.
44     *
45     * @param identifier short name for the bus, for logging purposes.
46     * @param executor   Executor to use to dispatch events. It is the caller's
47     *        responsibility to shut down the executor after the last event has
48     *        been posted to this event bus.
49     */
50    public AsyncEventBus(String identifier, Executor executor) {
51      super(identifier);
52      this.executor = checkNotNull(executor);
53    }
54  
55    /**
56     * Creates a new AsyncEventBus that will use {@code executor} to dispatch
57     * events.
58     *
59     * @param executor Executor to use to dispatch events. It is the caller's
60     *        responsibility to shut down the executor after the last event has
61     *        been posted to this event bus.
62     * @param subscriberExceptionHandler Handler used to handle exceptions thrown from subscribers.
63     *    See {@link SubscriberExceptionHandler} for more information.
64     * @since 16.0
65     */
66    public AsyncEventBus(Executor executor, SubscriberExceptionHandler subscriberExceptionHandler) {
67      super(subscriberExceptionHandler);
68      this.executor = checkNotNull(executor);
69    }
70  
71    /**
72     * Creates a new AsyncEventBus that will use {@code executor} to dispatch
73     * events.
74     *
75     * @param executor Executor to use to dispatch events. It is the caller's
76     *        responsibility to shut down the executor after the last event has
77     *        been posted to this event bus.
78     */
79    public AsyncEventBus(Executor executor) {
80      super("default");
81      this.executor = checkNotNull(executor);
82    }
83  
84    @Override
85    void enqueueEvent(Object event, EventSubscriber subscriber) {
86      eventsToDispatch.offer(new EventWithSubscriber(event, subscriber));
87    }
88  
89    /**
90     * Dispatch {@code events} in the order they were posted, regardless of
91     * the posting thread.
92     */
93    @SuppressWarnings("deprecation") // only deprecated for external subclasses
94    @Override
95    protected void dispatchQueuedEvents() {
96      while (true) {
97        EventWithSubscriber eventWithSubscriber = eventsToDispatch.poll();
98        if (eventWithSubscriber == null) {
99          break;
100       }
101 
102       dispatch(eventWithSubscriber.event, eventWithSubscriber.subscriber);
103     }
104   }
105 
106   /**
107    * Calls the {@link #executor} to dispatch {@code event} to {@code subscriber}.
108    */
109   @Override
110   void dispatch(final Object event, final EventSubscriber subscriber) {
111     checkNotNull(event);
112     checkNotNull(subscriber);
113     executor.execute(
114         new Runnable() {
115           @Override
116           public void run() {
117             AsyncEventBus.super.dispatch(event, subscriber);
118           }
119         });
120   }
121 }